package org.databandtech.mysql2kafka;

import java.util.Properties;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

public class KafkaToKafka {
	
	final static String READ_TOPIC = "Hello-Kafka";
	final static String SINK_TOPIC = "Hello-Kafka123";

	public static void main(String[] args) {

		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", "192.168.10.60:9092");
		properties.setProperty("group.id", "test");
		properties.setProperty("stream.parallelism", "4");
		
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		//数据读取
		env.enableCheckpointing(5000); // 每隔 5000 毫秒 执行一次 checkpoint
		FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(READ_TOPIC, new SimpleStringSchema(), properties);
		
		// 从最早的记录开始,全量采集
		myConsumer.setStartFromEarliest();     
		//myConsumer.setStartFromLatest();       // 从最新的记录开始
		//myConsumer.setStartFromTimestamp(startupOffsetsTimestamp); // 从指定的时间开始（毫秒）
		//myConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
		//从指定分区的位置开始
		//		Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
		//		specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
		//		specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
		//		specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
		//
		//		myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
		
		// 默认的方法
		//myConsumer.setStartFromGroupOffsets(); 
		
		DataStream<String> streamInput = env
		  .addSource(myConsumer);
		
		streamInput.print();
		
		//数据写入
		Properties propertiesSink = new Properties();
		
		propertiesSink.setProperty("bootstrap.servers", "192.168.10.60:9092");
		propertiesSink.setProperty("stream.checkpoint.interval", "5000");
		propertiesSink.setProperty("stream.sink.parallelism", "2");

		FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(SINK_TOPIC,new SimpleStringSchema(),propertiesSink);   // 序列化 schema

		// 0.10+ 版本的 Kafka 允许在将记录写入 Kafka 时附加记录的事件时间戳；
		// 此方法不适用于早期版本的 Kafka
		myProducer.setWriteTimestampToKafka(true);
		
		streamInput.addSink(myProducer);
		
		try {
			env.execute("ok-kafka");
		} catch (Exception e) {
			e.printStackTrace();
		}

	}

}
